KAFKA-19712: ProcessorStateManager delegates offset tracking to stores#21738
KAFKA-19712: ProcessorStateManager delegates offset tracking to stores#21738bbejeck merged 3 commits intoapache:trunkfrom
Conversation
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
|
Disclosure: the changes here were all hand-written and verified by me, but I used Claude Code to help me break up a large set of changes into multiple PRs, and to analyse the changes for issues that had not been caught by tests. |
bbejeck
left a comment
There was a problem hiding this comment.
Thanks for the PR @nicktelford - overall LGTM with a couple of small issues to address before we merge
| private boolean taskDirIsEmpty(final File taskDir) { | ||
| final File[] storeDirs = taskDir.listFiles(pathname -> | ||
| !pathname.getName().equals(CHECKPOINT_FILE_NAME)); | ||
| !pathname.getName().equals(LegacyCheckpointingStateStore.CHECKPOINT_FILE_NAME)); |
There was a problem hiding this comment.
I think this needs to be updated as it looks like the filter is going miss the new checkpoint file names of checkpoint_<store name>
There was a problem hiding this comment.
Good spot, I also found another place that does a similar check. Both have been updated to use startsWith instead of equals, to handle both legacy and per-store checkpoint file names.
| } | ||
|
|
||
| @Test | ||
| public void shouldDeleteCheckPointFileIfEosEnabled() throws IOException { |
There was a problem hiding this comment.
I'm wondering if we should keep this test for now, won't we have users running in the pre-txn statestore mode for a while?
There was a problem hiding this comment.
Deleting checkpoint files is now entirely handled by LegacyCheckpointingStateStore, so I think this is covered by this test?
The method this test calls, stateMgr.deleteCheckPointFileIfEOSEnabled, has been completely removed, because (under EOS) we now delete the checkpoint file in init() and only ever write it in close(). This ensures we don't resurrect KAFKA-10362.
| for (final StateStoreMetadata store : stores.values()) { | ||
| if (store.corrupted) { | ||
| log.error("Tried to initialize store offsets for corrupted store {}", store); | ||
| throw new ProcessorStateException( |
There was a problem hiding this comment.
This is correct, but is there a chance this could lead to a behavior change since the previous code only threw IllegalStateException meaning this might land in a catch block it didn't before?
There was a problem hiding this comment.
This is actually for backwards compatibility: the old throw here was inside a try block, which actually caught the IllegalStateException and wrapped it in a ProcessorStateException.
Since the try block was removed (as we're no longer working with files in this method), we now just directly wrap the exception when we throw it.
| logPrefix), e); | ||
| } | ||
|
|
||
| stateDirectory.updateTaskOffsets(taskId, changelogOffsets()); |
There was a problem hiding this comment.
This used to be withing the try/catch block but now if there's an error it will bubble up and possibly escape handling since it's no longer going throw either TaskCorruptedException or ProcessorStateException
There was a problem hiding this comment.
Ahh yes. Looking at the implementation of updateTaskOffsets, it seems the only exception that would have previously been caught here is a StreamsException (wrapping an IllegalStateException), thrown when one of the offsets is negative.
I actually think the old behaviour might have been a bit of a bug (wrapping a StreamsException in a ProcessorStateException seems weird to me). But this exception should never be thrown anyway, because it would require an offset to somehow be negative (which I think would indicate either a bug or data corruption).
Regardless, I'm restoring the try/catch block, locally around this statement, but with a slightly re-worded error message.
We have two places that check against the `.checkpoint` name: 1. Checks store names don't conflict with `.checkpoint` files. 2. Ignores `.checkpoint` files when determining if a state directory is empty. These need to both be updated to include per-store checkpoint files, that are named `.checkpoint_<store name>`. To do this, we just change our equality check for a prefix check; ensuring that any file/store that starts with `.checkpoint` is captured.
If a negative offset is passed into `updateTaskOffsets`, it will throw a `StreamsException`, wrapping an `IllegalStateException`. This was previously caught and wrapped in a `ProcessorStateException`, so we should restore this behaviour, even though it should be essentially impossible without a bug or data corruption.
|
Merged #21738 into trunk |
|
@nicktelford @bbejeck Can you please validate that kafkatest.tests.streams.streams_application_upgrade_test.StreamsUpgradeTest.test_app_upgrade works with these changes? It seems it is failing starting with this commit. Claude analysis: The previous analysis identified KAFKA-19434 (initializeStartupStores) as the cause. While KAFKA-19434 (Feb 20) introduced the startup initialization code path, the failure only started recently because KAFKA-19712 (commit 5740a26, March 16) fundamentally changed how offsets are read within that path: Before KAFKA-19712: initializeStoreOffsetsFromCheckpoint() read offsets from the .checkpoint file and used changelogOffsetFromCheckpointedOffset() to convert the OFFSET_UNKNOWN sentinel (-4) back to null. This properly handled state written by older Kafka versions. After KAFKA-19712: The method was rewritten to initializeStoreOffsets(), which calls store.stateStore.committedOffset(store.changelogPartition) instead of reading the checkpoint file. RocksDB stores now manage their own offsets (managesOffsets() == true) and are no longer wrapped in LegacyCheckpointingStateStore. The changelogOffsetFromCheckpointedOffset() method — which handled the -4 → null conversion — was removed entirely. When opening state directories written by an older Kafka Streams version during upgrade, the new RocksDB store's committedOffset() returns raw values that were never intended to be exposed directly. This produces the -3 value (from -4 + 1) that fails the sentinel validation in sumOfChangelogOffsets(). KAFKA-20257 (commit 823f0cf, same day) makes the same delegation change for GlobalStateManagerImpl and may have similar upgrade implications. Both commits landed on March 16 ~17:15 UTC, approximately 4 hours before the test workflow started (21:00 UTC), confirming they were in the tested build. |
@lucasbru Thanks for letting me know. I keep forgetting to run the smoke tests! 😬 Thanks to your Claude analysis, I think I've identified the bug: when migrating legacy offsets from I'm just running the test suites now to validate the fix, and if it all passes, I'll raise a PR ASAP. Will tag you and Bill as reviewers. |
As part of KIP-1035, we want to transition away from task-specific
.checkpointfiles, and instead delegate offset management toStateStores.We now have a
LegacyCheckpointingStateStorewrapper to encapsulate themanagement of offsets for
StateStoreimplementations that do not knowhow to manage their own offsets (i.e. for which
managesOffsets() == false).As of KAFKA-20212,
RocksDBStorenow knows how to manage its ownoffsets, so it will not be wrapped in a
LegacyCheckpointingStateStore;only user-defined persistent stores will use this wrapper.
Corresponding changes to
GlobalStateManagerImplwill be submittedindependently, as KAFKA-20257.
Until both
ProcessorStateManagerandGlobalStateManagerImplhavebeen updated, the
StateManagerinterface must remain as-is. Therefore,the
flushandcheckpointmethods will not be consolidated until alater PR, which will clean up the interface and its usage by
Taskandfriends.
Reviewers: Bill Bejeck bbejeck@apache.org